-
Notifications
You must be signed in to change notification settings - Fork 3k
Core: Interface based DataFile reader and writer API #12298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
907089c to
313c2d5
Compare
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/AppenderBuilder.java
Outdated
Show resolved
Hide resolved
c528a52 to
9975b4f
Compare
liurenjie1024
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pvary for this proposal, I left some comments.
core/src/main/java/org/apache/iceberg/io/datafile/ReaderBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
|
I will start to collect the differences here between the different writer types (appender/dataWriter/equalityDeleteWriter/positionalDeleteWriter) for reference:
|
core/src/main/java/org/apache/iceberg/io/datafile/DataWriterBuilder.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/io/datafile/DataFileServiceRegistry.java
Outdated
Show resolved
Hide resolved
...urces/META-INF/services/org.apache.iceberg.io.datafile.DataFileServiceRegistry$WriterService
Outdated
Show resolved
Hide resolved
|
While I think the goal here is a good one, the implementation looks too complex to be workable in its current form. The primary issue that we currently have is adapting object models (like Iceber's internal - switch (format) {
- case AVRO:
- AvroIterable<ManifestEntry<F>> reader =
- Avro.read(file)
- .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
- .createResolvingReader(this::newReader)
- .reuseContainers()
- .build();
+ CloseableIterable<ManifestEntry<F>> reader =
+ InternalData.read(format, file)
+ .project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
+ .reuseContainers()
+ .build();
- addCloseable(reader);
+ addCloseable(reader);
- return CloseableIterable.transform(reader, inheritableMetadata::apply);
+ return CloseableIterable.transform(reader, inheritableMetadata::apply);
-
- default:
- throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
- }This shows:
In this PR, there are a lot of other changes as well. I'm looking at one of the simpler Spark cases in the row reader. The builder is initialized from return DataFileServiceRegistry.readerBuilder(
format, InternalRow.class.getName(), file, projection, idToConstant)There are also new static classes in the file. Each creates a new service and each service creates the builder and object model: public static class AvroReaderService implements DataFileServiceRegistry.ReaderService {
@Override
public DataFileServiceRegistry.Key key() {
return new DataFileServiceRegistry.Key(FileFormat.AVRO, InternalRow.class.getName());
}
@Override
public ReaderBuilder builder(
InputFile inputFile,
Schema readSchema,
Map<Integer, ?> idToConstant,
DeleteFilter<?> deleteFilter) {
return Avro.read(inputFile)
.project(readSchema)
.createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant));
}The In addition, there are now a lot more abstractions:
I think that the next steps are to focus on making this a lot simpler, and there are some good ways to do that:
|
I'm happy that we agree with the goals. I created a PR to start the conversation. If there are willing reviewers we can introduce more invasive changes to archive a better API. I'm all for it!
I think we need to keep this direct transformations to prevent the performance loss which would be caused by multiple transformations between object model -> common model -> file format. We have a matrix of transformation which we need to encode somewhere:
The InternalData reader has one advantage over the data file readers/writers. The internal object model is static for these readers/writers. For the DataFile readers/writers we have multiple object models to handle.
If we allow adding new builders for the file formats we can remove a good chunk of the boilerplate code. Let me see how this would look like
We need to refactor the Avro positional delete write for this, or add a positionalWriterFunc. Also need to consider that the format specific configurations which are different for the appenders and the delete files (DELETE_PARQUET_ROW_GROUP_SIZE_BYTES vs. PARQUET_ROW_GROUP_SIZE_BYTES)
If we are ok with having a new Builder for the readers/writers, then we don't need the service. It was needed to keep the current APIs and the new APIs compatible.
Will do
Will see what could be arcived |
c488d32 to
71ec538
Compare
ec93457 to
9627db8
Compare
b03fd34 to
00626e2
Compare
parquet/src/main/java/org/apache/iceberg/parquet/ParquetFormatModel.java
Outdated
Show resolved
Hide resolved
00626e2 to
67e706b
Compare
67e706b to
a11216f
Compare
rashworld-max
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| @Override | ||
| public ReadBuilder<D, S> set(String key, String value) { | ||
| // Configuration is not used for Avro reader creation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: since the read builder could support this, I think this should throw an exception to let the caller know that it is not functional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is equivalent to the Parquet.ReadBuilder.set method, which is used to pass user-provided configuration values to the reader. See how it is used in the benchmark:
Lines 154 to 160 in 157b248
| .set("org.apache.spark.sql.parquet.row.requested_schema", sparkSchema.json()) | |
| .set("spark.sql.parquet.binaryAsString", "false") | |
| .set("spark.sql.parquet.int96AsTimestamp", "false") | |
| .set("spark.sql.caseSensitive", "false") | |
| .set("spark.sql.parquet.fieldId.write.enabled", "false") | |
| .set("spark.sql.parquet.inferTimestampNTZ.enabled", "false") | |
| .set("spark.sql.legacy.parquet.nanosAsLong", "false") |
The caller will not know the actual type of the ReadBuilder, so if we throw an exception here, then use-cases will break. That is why the javadoc for the method says:
Reader builders should ignore configuration keys not known for them.
| /** Set the projection schema. */ | ||
| ReadBuilder<D, S> project(Schema schema); | ||
|
|
||
| /** Sets the expected output schema. If not provided derived from the {@link #project(Schema)}. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this javadoc is very clear because of the reference to project. Calling project sets the Iceberg schema, not the engine schema and the builder won't do anything to create an engine schema from an Iceberg schema.
I think that this should be Sets the engine's representation of the projected schema.
We also do not want callers to use this in place of the Iceberg schema because this is opaque to core code -- Iceberg can't verify the engine schema or convert between the two projection representations. We should have a note that says this schema should match the requested Iceberg projection, but may differ in ways that Iceberg considers equivalent. For example, we may use this to exchange the engine's requested shredded representation for a variant, and we could also use this to pass things like specific classes to use for structs (like we have for our internal object model).
Also, what about only allowing this to be set if project is also set? I don't think it is necessary to do this, but I also can't think of a case where you might set an engine schema but not an Iceberg schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a good example of how this is used comes from the write path, where smallint may be passed to an int field. Here, the engine may want to use a bigint value even though the Iceberg schema is an int.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
builder won't do anything to create an engine schema from an Iceberg schema.
I wanted to highlight that setting the outputSchema is not mandatory. If not provided the builder will generate a default representation.
I agree that better doc is needed.
I will reword and add the examples.
I think projection is required anyway, so I updated the javadoc accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the javadoc. Please take another look.
Thanks!
| * Sets the input schema accepted by the writer. If not provided derived from the {@link | ||
| * #schema(Schema)}. | ||
| */ | ||
| WriteBuilder<D, S> inputSchema(S schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went into a bit of detail on the naming and javadoc for the equivalent method in the read builder. We should do the same things here:
- Consider using
engineSchemarather than "input" - Clarify the javadoc: this is the engine schema describing the rows passed to the writer, which may be more specific than the Iceberg schema (for example,
tinyint,smallint, andintmay be passed when the Iceberg type isint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the javadoc here too.
Please check
|
|
||
| /** | ||
| * Sets the input schema accepted by the writer. If not provided derived from the {@link | ||
| * #rowSchema(Schema)}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't true, is it? How would the engine schema be derived from the row schema? The Iceberg schema can be derived from it, but this one can't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad wording. The accepted representation will be derived from the schema.
Copied the javadoc from the WriteBuilder
Please check
| EqualityDeleteWriteBuilder<D, S> rowSchema(Schema rowSchema); | ||
|
|
||
| /** Sets the equality field ids for the equality delete writer. */ | ||
| default EqualityDeleteWriteBuilder<D, S> equalityFieldIds(List<Integer> fieldIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this default go the other way? Usually we translate varargs versions into lists for the actual implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In FileMetadata, equality fields are stored as int[], so I used the target type directly to avoid unnecessary conversions.
| WriteBuilder<D, S> withAADPrefix(ByteBuffer aadPrefix); | ||
|
|
||
| /** Finalizes the configuration and builds the {@link FileAppender}. */ | ||
| FileAppender<D> build() throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this throw an IOException when the read path does not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When file creation fails, ORC wraps the IOException in a RuntimeIOException. Parquet and Avro surface the original IOException. I chose to follow the Parquet and Avro behavior and leave this unchanged.
| * Sets the input schema accepted by the writer. If not provided derived from the {@link | ||
| * #schema(Schema)}. | ||
| */ | ||
| DataWriteBuilder<D, S> inputSchema(S schema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not derived from the write schema, either. And this can be moved to the common write methods, right? I don't see a difference between the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the name and the javadoc based on the comments on the WriteBuilder.
We can't move this to the common methods, as we don't have this in the PositionDeleteWriteBuilder
Here is what the PR does:
ReadBuilder- Builder for reading data from data filesAppenderBuilder- Builder for writing data to data filesObjectModel- Providing ReadBuilders, and AppenderBuilders for the specific data file format and object model pairAppenderBuilder- Builder for writing a fileDataWriterBuilder- Builder for generating a data filePositionDeleteWriterBuilder- Builder for generating a position delete fileEqualityDeleteWriterBuilder- Builder for generating an equality delete fileReadBuilderhere - the file format reader builder is reusedWriterBuilderclass which implements the interfaces above (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) based on a provided file format specificAppenderBuilderObjectModelRegistrywhich stores the availableObjectModels, and engines and users could request the readers (ReadBuilder) and writers (AppenderBuilder/DataWriterBuilder/PositionDeleteWriterBuilder/EqualityDeleteWriterBuilder) from.GenericObjectModels- for reading and writing Iceberg RecordsSparkObjectModels- for reading (vectorized and non-vectorized) and writing Spark InternalRow/ColumnarBatch objectsFlinkObjectModels- for reading and writing Flink RowData objects